Skip to content

Conversation

samthanawalla
Copy link
Contributor

This CL utilizes the event store to write outgoing messages and removes the unbounded outgoing data structure.

For #190

This CL utilizes the event store to write outgoing messages and removes
the unbounded outgoing data structure.

For modelcontextprotocol#190
@samthanawalla samthanawalla marked this pull request as ready for review August 20, 2025 15:47
@samthanawalla samthanawalla requested review from findleyr and jba August 20, 2025 15:47
for data, err := range c.eventStore.After(ctx, c.SessionID(), stream.id, lastIndex) {
if err != nil {
// Wait for session initialization before yielding.
if errors.Is(err, ErrUnknownSession) || errors.Is(err, ErrUnknownStream) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of doing it this way, I would avoid calling After at all if there is no session or stream.

If there is a session and stream and After returns one of these errors, I think it is a real error and should be yielded.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure there is an easy way to do that because the session and stream may exist but it may not exist in the event store yet.

Append could happen before or after the After call which is why we need After to report the error to us.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should only call After when the client sends Last-Event-ID. If they send it too early, the server should return an error. I don't understand the state where After is called before Append.

Copy link
Contributor Author

@samthanawalla samthanawalla Aug 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should only call After when the client sends Last-Event-ID.

We can call After with an index of -1 to start writing from the beginning of the stream which allows us to simplify the logic even if last-event-id is not sent.

I don't understand the state where After is called before Append.

After is called when respondSSE is called which is disjoint from when Append is called in Write. These events can happen in any order which is why we case on ErrUnknownSession and ErrUnknownStream to skip to the logic below which waits for a stream signal.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we just call Append with nil data when we create the stream?

Here's the problem: I'd like the eventstore to be able to completely clean up the stream or session at will, and so when we get an unknown session or stream, we should fail this connection because it will never be recoverable.

mcp/event.go Outdated
@@ -283,6 +283,12 @@ func (s *MemoryEventStore) Append(_ context.Context, sessionID string, streamID
// index is no longer available.
var ErrEventsPurged = errors.New("data purged")

// ErrUnknownSession is the error that [EventStore.After] should return if the session ID is unknown.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: s/return/wrap: we don't return this error value exactly, but one that wraps it.


// If all requests have been handled and replied to, we should terminate this connection.
// "After the JSON-RPC response has been sent, the server SHOULD close the SSE stream."
// §6.4, https://modelcontextprotocol.io/specification/2025-06-18/basic/transports#sending-messages-to-the-server
// We only want to terminate POSTs, and GETs that are replaying. The general-purpose GET
// (stream ID 0) will never have requests, and should remain open indefinitely.
if nOutstanding == 0 && !persistent {
if nOutstanding == 0 && !persistent && lastIndex >= int(stream.lastWriteIndex.Load()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally thought that this commit reintroduced the bug that was fixed in findleyr@f4a9396. However, I see that it probably doesn't, because of this atomic check.

I think it would be simpler to just move the check for nOutstanding above the After loop above. Then you don't need lastWriteIndex. WDYT?

I prefer to avoid atomics when there's already a synchronization mechanism (mu), because it's hard to reason about the relationship between the atomics and critical sections.

for data, err := range c.eventStore.After(ctx, c.SessionID(), stream.id, lastIndex) {
if err != nil {
// Wait for session initialization before yielding.
if errors.Is(err, ErrUnknownSession) || errors.Is(err, ErrUnknownStream) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we just call Append with nil data when we create the stream?

Here's the problem: I'd like the eventstore to be able to completely clean up the stream or session at will, and so when we get an unknown session or stream, we should fail this connection because it will never be recoverable.

Copy link
Contributor

@jba jba left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry that this PR is confusing me so much.
I think I need to see the algorithm as a whole, without diffs.
So if you think it is correct after addressing my comments that it is correct, I'll approve and then read it separately.

@@ -274,6 +274,12 @@ func (s *MemoryEventStore) Append(_ context.Context, sessionID string, streamID
// Purge before adding, so at least the current data item will be present.
// (That could result in nBytes > maxBytes, but we'll live with that.)
s.purge()

// An empty data slice signals that a stream has been registered.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"...signals that the stream has just been created."

This behavior should be documented in EventStore.After.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I'd rather have a new method for this, instead of overloading append. Start(ctx, sessionID, streamID)?

IIUC, the purpose of this is so that After will not return an error if it is called before the first Append, is that right?

@@ -675,20 +653,27 @@ func (c *streamableServerConn) respondSSE(stream *stream, w http.ResponseWriter,
// If the stream did not terminate normally, it is either because ctx was
// cancelled, or the connection is closed: check the ctx.Err() to differentiate
// these cases.
func (c *streamableServerConn) messages(ctx context.Context, stream *stream, persistent bool) iter.Seq2[json.RawMessage, bool] {
return func(yield func(json.RawMessage, bool) bool) {
func (c *streamableServerConn) messages(ctx context.Context, stream *stream, persistent bool, lastIndex int) iter.Seq2[json.RawMessage, error] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doc the persistent and lastIndex args.

return
}
// The stream exists, but does not contain any messages on the stream.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/on the stream//

return
}
// The stream exists, but does not contain any messages on the stream.
// Do not yield this data.
if data == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused. If there are no messages, how did we get inside this loop? Shouldn't After yield nothing at all?

@samthanawalla
Copy link
Contributor Author

Sorry that this PR is confusing me so much. I think I need to see the algorithm as a whole, without diffs. So if you think it is correct after addressing my comments that it is correct, I'll approve and then read it separately.

Sorry I prematurely sent a new commit- not ready for review yet! (The flow is very confusing, I agree)

@samthanawalla samthanawalla marked this pull request as draft August 25, 2025 19:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants